Parallel computing with Dask
Contents
Parallel computing with Dask#
Context#
We will be using Dask with Xarray to parallelize our data analysis. The analysis is very similar to what we have done in previous episodes but this time we will use data on a global coverage that we read from a shared catalog (stored online in the Pangeo EOSC Openstack Object Storage).
Data#
In this episode, we will be using Global Long Term Statistics (1999-2019) product provided by the Copernicus Global Land Service over Lombardia and access them through S3-comptabile storage (OpenStack Object Storage “Swift”) with a data catalog we have created and made publicly available.
Setup#
This episode uses the following Python packages:
pooch [USR+20]
s3fs [S3FsDTeam16]
hvplot [RSB+20]
dask [DaskDTeam16]
graphviz [EGK+03]
numpy [HMvdW+20]
pandas [pdt20]
geopandas [JdBF+20]
Please install these packages if not already available in your Python environment (you might want to take a look at the Setup page of the tutorial).
Packages#
In this episode, Python packages are imported when we start to use them. However, for best software practices, we recommend you to install and import all the necessary libraries at the top of your Jupyter notebook.
Parallelize with Dask#
We know from previous chapter chunking_introduction that chunking is key for analyzing large datasets. In this episode, we will learn to parallelize our data analysis using Dask on our chunked dataset.
What is Dask ?#
Dask scales the existing Python ecosystem: with very or no changes in your code, you can speed-up computation using Dask or process bigger than memory datasets.
Dask is a flexible library for parallel computing in Python.
It is widely used for handling large and complex Earth Science datasets and speed up science.
Dask is powerful, scalable and flexible. It is the leading platform today for data analytics at scale.
It scales natively to clusters, cloud, HPC and bridges prototyping up to production.
The strength of Dask is that is scales and accelerates the existing Python ecosystem e.g. Numpy, Pandas and Scikit-learn with few effort from end-users.
It is interesting to note that at first, Dask has been created to handle data that is larger than memory, on a single computer. It then was extended with Distributed to compute data in parallel over clusters of computers.
How does Dask scale and accelerate your data analysis?#
Dask proposes different abstractions to distribute your computation. In this Dask Introduction section, we will focus on Dask Array which is widely used in pangeo ecosystem as a back end of Xarray.
As shown in the previous section Dask Array is based on chunks. Chunks of a Dask Array are well-known Numpy arrays. By transforming our big datasets to Dask Array, making use of chunk, a large array is handled as many smaller Numpy ones and we can compute each of these chunks independently.
- `Xarray` uses Dask Arrays instead of Numpy when chunking is enabled, and thus all Xarray operations are performed through Dask, which enables distributed processing.
How does Xarray with Dask distribute data analysis?#
When we use chunks with Xarray, the real computation is only done when needed or asked for, usually when invoking compute() function. Dask generates a task graph describing the computations to be done. When using Dask Distributed a Scheduler distributes these tasks across several Workers.

Tip
A Dask client can also be created on a single machine (for instance your laptop) e.g. there is no need to have dedicated computational resources. However, speedup will only be limited to your single machine resources if you do not have dedicated computational resources!
#### What is a Dask Cluster?
A Dask Distributed cluster is made of two main components:
a Scheduler, responsible for handling computations graph and distributing tasks to Workers.
One or several (up to 1000s) Workers, computing individual tasks and storing results and data into distributed memory.
To interact with a Dask Cluster, a Client object must also be used.

Dask clusters can be deployed in many backend (Locally or on distributed infrastructures like Kubernetes, YaRN or HPC centers). In these cases, a Cluster object in responsible of deploying and scaling a Dask Cluster on the underlying resources.
Using Dask Distributed on a single server#
There are different methods to use Dask depending on the underlying infrastructure. For this workshop, according to the Pangeo EOSC deployment, you will learn how to use Dask Gateway to manage Dask clusters over Kubernetes, allowing to run our data analysis in parallel e.g. distribute tasks across several workers.
However, you do not always need to access a multi-node Dask cluster. It is very handy to prototype or even run data analysis on your own laptop, or a small server. So let’s keep it simple for now and learn how to create a local Dask cluster to distribute some work.
Create a local Dask cluster#
The Dask Client is what allows you to interact with Dask Clusters. When using Dask Distributed, you always need to create a Client object. Once a Client has been created, it will be used by default by each call to a Dask API, even if you do not explicitly use it.
No matter the Dask API (e.g. Arrays, Dataframes, Delayed, Futures, etc.) that you use, under the hood, Dask will create a Directed Acyclic Graph (DAG) of tasks by analysing the code. Client will be responsible to submit this DAG to the Scheduler along with the final result you want to compute. It will also gather results from the Workers, and aggregates it back in its underlying Python process.
Using Client() function with no argument, you will create a local Dask cluster with a number of workers and threads per worker corresponding to the number of cores in the local machine. Here, we are running this notebook in the cloud, so the number of cores is the number of cores on the cloud computing resources you’ve been given (not on your laptop).
from distributed import Client
client = Client() # create a local dask cluster on the local machine.
client
Client
Client-33125670-223f-11ed-93e3-000d3aee8915
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
b759e35d
| Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
| Total threads: 2 | Total memory: 6.78 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-ac2d6916-5b3f-47b6-8618-f88ad75cf821
| Comm: tcp://127.0.0.1:45831 | Workers: 2 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 2 |
| Started: Just now | Total memory: 6.78 GiB |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:44895 | Total threads: 1 |
| Dashboard: http://127.0.0.1:34391/status | Memory: 3.39 GiB |
| Nanny: tcp://127.0.0.1:35989 | |
| Local directory: /tmp/dask-worker-space/worker-3ukdgd4q | |
Worker: 1
| Comm: tcp://127.0.0.1:40171 | Total threads: 1 |
| Dashboard: http://127.0.0.1:39367/status | Memory: 3.39 GiB |
| Nanny: tcp://127.0.0.1:42587 | |
| Local directory: /tmp/dask-worker-space/worker-dx7i2law | |
Inspecting the Cluster Info section above gives us information about the created cluster: we have 2 or 4 workers and the same number of threads (e.g. 1 thread per worker).
You can also create a local cluster with the LocalCluster constructor and use n_workers and threads_per_worker to anually specify the number of processes and threads you want to use. For instance, we could use n_workers=2 and threads_per_worker=2 (the total number of threads would be 4 and each Worker would have 2 threads). This is sometimes preferable (in terms of performance) but out of scope of this tutorial.
- Dask will try to hold data on the memory, then try to spill that to hard disk of worker. If you would like to avoid that dask worker use your local disk ( it will slow down your computation), you can use following command.
- import dask.distributed
- dask.config.set({"distributed.worker.memory.spill": 0})
Dask Dashboard#
Dask comes with a really handy interface: the Dask Dashboard.
The Dashboard link above will not work, but you can open it using the following link:
https://pangeo-foss4g.vm.fedcloud.eu/jupyterhub/user/user/proxy/8787/status
It’s really helpfull to understand your computation and how it is distributed.
A little Dask computation#
Launch the below cell which tries to approximate Pi using Dask Arrays, and watch what is going on on the Dashboard.
import dask.array as da
sample = 1_000_000_000 # <- this is huge!
xxyy = da.random.uniform(-1, 1, size=(2, sample))
norm = da.linalg.norm(xxyy, axis=0)
summ = da.sum(norm <= 1)
insiders = summ.compute()
pi = 4 * insiders / sample
print("pi ~= {}".format(pi))
pi ~= 3.141722304
Local Dask Distributed computations on our dataset#
Lets open dataset from catalogue we made before, select a single location and visualize the task graph generated by Dask.
import xarray as xr
catalogue="https://object-store.cloud.muni.cz/swift/v1/foss4g-catalogue/c_gls_NDVI-LTS_1999-2019.json"
LTS = xr.open_mfdataset(
"reference://", engine="zarr",
backend_kwargs={
"storage_options": {
"fo":catalogue
},
"consolidated": False
}
)
LTS
<xarray.Dataset>
Dimensions: (lat: 15680, lon: 40320, time: 36)
Coordinates:
* lat (lat) float64 80.0 79.99 79.98 79.97 ... -59.97 -59.98 -59.99
* lon (lon) float64 -180.0 -180.0 -180.0 -180.0 ... 180.0 180.0 180.0
* time (time) float64 nan 1.0 2.0 3.0 4.0 5.0 ... 31.0 32.0 33.0 34.0 35.0
Data variables:
crs object ...
max (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
mean (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
median (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
min (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
nobs (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
stdev (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
Attributes: (12/19)
Conventions: CF-1.6
archive_facility: VITO
copyright: Copernicus Service information 2021
history: 2021-03-01 - Processing line NDVI LTS
identifier: urn:cgls:global:ndvi_stats_all:NDVI-LTS_1999-2019-0...
institution: VITO NV
... ...
references: https://land.copernicus.eu/global/products/ndvi
sensor: VEGETATION-1, VEGETATION-2, VEGETATION
source: Derived from EO satellite imagery
time_coverage_end: 2019-12-31T23:59:59Z
time_coverage_start: 1999-01-01T00:00:00Z
title: Normalized Difference Vegetation Index: Long Term S...save=LTS.sel(lat=45.50, lon=9.36, method='nearest')['min'].mean()
save.data
|
Did you notice something on the Dask Dashboard when running the two previous cells?
It is because currently, we didn’t execute anything, we just build a Dask task graph, but did not ask Dask to return a result.
As you look at ‘task count’ we see 6121 task for just for computing a mean on 36 temporal steps.
To avoid unecessary operations, we optimize the task graph using optimize, and verify the graph.
Optimize the task graph#
import dask
(save,) = dask.optimize(save)
save.data
|
Now our task is reduced to 121. Lets try to visualise it:
save.data.visualize()
Compute on the dask workers#
save.compute()
<xarray.DataArray 'min' ()>
array(0.31844446, dtype=float32)
Coordinates:
lat float64 45.5
lon float64 9.357Calling compute on our Xarray object triggered the execution on Dask Cluster side.
You should be able to see some tasks executing by looking at the Dashboard. Once it is finished, the result of the computation is returned.
Close client to terminate local dask cluster#
The Client and associated LocalCluster object will be automatically closed when your Python session ends. When using Jupyter notebooks, we recommend to close it explicitely whenever you are done with your local Dask cluster.
client.close()
Set up Dask Gateway#
When we want to scale out our data analysis and cannot only use on machine, we need to be able to access a multi-node Dask cluster.
On the EOSC Pangeo infrastructure, we can use Dask Gateway to manage Dask clusters and run our data analysis in parallel e.g. distribute tasks across several workers.
As Dask Gateway is configured by default on this ifnrastructure, you just need to execute the following cells.
from dask_gateway import Gateway
gateway = Gateway()
Create a new Dask cluster with the Dask Gateway#
cluster = gateway.new_cluster()
cluster.scale(4)
cluster
Open the Dask Dashboard by clicking on the link above!!#
Get a client from the Dask Gateway Cluster#
As stated above, creating a Dask Client is mandatory in order to perform following Daks computations on your Dask Cluster.
## Please don't execute this cell, it is needed for building the Jupyter Book
cluster = None
from distributed import Client
if cluster:
client = Client(cluster) # create a dask Gateway cluster
else:
client = Client() # create a local dask cluster on the machine.
client
Client
Client-4dbb7076-223f-11ed-93e3-000d3aee8915
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
bc775d44
| Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
| Total threads: 2 | Total memory: 6.78 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-96d2d0aa-9462-44c5-b27f-fb52af0a8819
| Comm: tcp://127.0.0.1:40427 | Workers: 2 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 2 |
| Started: Just now | Total memory: 6.78 GiB |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:33021 | Total threads: 1 |
| Dashboard: http://127.0.0.1:38453/status | Memory: 3.39 GiB |
| Nanny: tcp://127.0.0.1:38369 | |
| Local directory: /tmp/dask-worker-space/worker-8ehniblh | |
Worker: 1
| Comm: tcp://127.0.0.1:39779 | Total threads: 1 |
| Dashboard: http://127.0.0.1:38853/status | Memory: 3.39 GiB |
| Nanny: tcp://127.0.0.1:33497 | |
| Local directory: /tmp/dask-worker-space/worker-wa97k2v4 | |
Global LTS computation#
In the previous episode, we used Long-term Timeseries for the region of Lombardy e.g. a very small area that was extracted upfront for simplicity. Now we will use the original dataset that has a global coverage, and work directly on it to extract our AOI and perform computations.
Read from online kerchunked consolidated dataset#
We will access Long Term TimeSeries of NDVI statistics from OpenStack Object Storage using the Zarr metadata generated with kerchunk, prepared in previous chunking_introduction section.
import pandas as pd
import numpy as np
catalogue="https://object-store.cloud.muni.cz/swift/v1/foss4g-catalogue/c_gls_NDVI-LTS_1999-2019.json"
LTS = xr.open_mfdataset(
"reference://", engine="zarr",
backend_kwargs={
"storage_options": {
"fo":catalogue
},
"consolidated": False
}
)
LTS
<xarray.Dataset>
Dimensions: (lat: 15680, lon: 40320, time: 36)
Coordinates:
* lat (lat) float64 80.0 79.99 79.98 79.97 ... -59.97 -59.98 -59.99
* lon (lon) float64 -180.0 -180.0 -180.0 -180.0 ... 180.0 180.0 180.0
* time (time) float64 nan 1.0 2.0 3.0 4.0 5.0 ... 31.0 32.0 33.0 34.0 35.0
Data variables:
crs object ...
max (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
mean (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
median (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
min (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
nobs (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
stdev (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
Attributes: (12/19)
Conventions: CF-1.6
archive_facility: VITO
copyright: Copernicus Service information 2021
history: 2021-03-01 - Processing line NDVI LTS
identifier: urn:cgls:global:ndvi_stats_all:NDVI-LTS_1999-2019-0...
institution: VITO NV
... ...
references: https://land.copernicus.eu/global/products/ndvi
sensor: VEGETATION-1, VEGETATION-2, VEGETATION
source: Derived from EO satellite imagery
time_coverage_end: 2019-12-31T23:59:59Z
time_coverage_start: 1999-01-01T00:00:00Z
title: Normalized Difference Vegetation Index: Long Term S...By inspecting any of the variable on the representation above, you’ll see that each data array represent about 85GiB of data, so much more thant what is available on this notebook server, and even on the Dask Cluster we created above. But thants to chunking, we can still analyze it!
Visualize LTS statistics#
Don’t forget to have a look at the Dashboard when executing the cells below.
import matplotlib.pyplot as plt
fig = plt.figure(1, figsize=[15,5])
# Fix extent
minval = 0.0
maxval = 0.9
itime=0 # plot the first date
# Plot 1 for min subplot argument (nrows, ncols, nplot)
# here 1 row, 2 columns and 1st plot
ax1 = plt.subplot(1, 2, 1)
LTS.isel(time=itime, lat=slice(1000,2000), lon=slice(1000,2000))['min'].plot(ax=ax1)
# Plot 2 for max
# 2nd plot
ax2 = plt.subplot(1, 2, 2)
LTS.isel(time=itime, lat=slice(1000,2000), lon=slice(1000,2000))['max'].plot(ax=ax2)
# Title for both plots
fig.suptitle('LTS NDVI statistics (Minimum and Maximum)', fontsize=20)
Text(0.5, 0.98, 'LTS NDVI statistics (Minimum and Maximum)')
Fix time coordinate#
As observed data are coming with a predefined year. To let xarray automatically align the LTS with the lastest NDVI values, the time dimension needs to be shifted to the NDVI values.
dates_2022 = pd.date_range('20220101', '20221231')
time_list = dates_2022[np.isin(dates_2022.day, [1,11,21])]
LTS = LTS.assign_coords(time=time_list)
LTS
<xarray.Dataset>
Dimensions: (lat: 15680, lon: 40320, time: 36)
Coordinates:
* lat (lat) float64 80.0 79.99 79.98 79.97 ... -59.97 -59.98 -59.99
* lon (lon) float64 -180.0 -180.0 -180.0 -180.0 ... 180.0 180.0 180.0
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-12-21
Data variables:
crs object ...
max (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
mean (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
median (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
min (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
nobs (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
stdev (time, lat, lon) float32 dask.array<chunksize=(1, 1207, 3102), meta=np.ndarray>
Attributes: (12/19)
Conventions: CF-1.6
archive_facility: VITO
copyright: Copernicus Service information 2021
history: 2021-03-01 - Processing line NDVI LTS
identifier: urn:cgls:global:ndvi_stats_all:NDVI-LTS_1999-2019-0...
institution: VITO NV
... ...
references: https://land.copernicus.eu/global/products/ndvi
sensor: VEGETATION-1, VEGETATION-2, VEGETATION
source: Derived from EO satellite imagery
time_coverage_end: 2019-12-31T23:59:59Z
time_coverage_start: 1999-01-01T00:00:00Z
title: Normalized Difference Vegetation Index: Long Term S...Clip LTS over Lombardia#
As in previous episodes, we use a shapefile over Italy to select data over this Area of Interest (AOI).
import geopandas as gpd
try:
GAUL = gpd.read_file('Italy.geojson')
except:
GAUL = gpd.read_file('zip+https://mars.jrc.ec.europa.eu/asap/files/gaul1_asap.zip')
AOI_name = 'Lombardia'
AOI = GAUL[GAUL.name1 == AOI_name]
AOI_poly = AOI.geometry
AOI_poly
14 POLYGON ((10.23973 46.62177, 10.25084 46.61110...
Name: geometry, dtype: geometry
We first select a geographical area that covers Lombardia (so that we have a first reduction from the global coverage) and then clip using the shapefile to avoid useless pixels.
LTS = LTS.sel(lat=slice(46.5,44.5), lon=slice(8.5,11.5))
LTS.rio.write_crs(4326, inplace=True)
<xarray.Dataset>
Dimensions: (lat: 224, lon: 336, time: 36)
Coordinates:
crs int64 0
* lat (lat) float64 46.49 46.48 46.47 46.46 ... 44.53 44.52 44.51 44.5
* lon (lon) float64 8.5 8.509 8.518 8.527 ... 11.46 11.47 11.48 11.49
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-12-21
Data variables:
max (time, lat, lon) float32 dask.array<chunksize=(1, 224, 336), meta=np.ndarray>
mean (time, lat, lon) float32 dask.array<chunksize=(1, 224, 336), meta=np.ndarray>
median (time, lat, lon) float32 dask.array<chunksize=(1, 224, 336), meta=np.ndarray>
min (time, lat, lon) float32 dask.array<chunksize=(1, 224, 336), meta=np.ndarray>
nobs (time, lat, lon) float32 dask.array<chunksize=(1, 224, 336), meta=np.ndarray>
stdev (time, lat, lon) float32 dask.array<chunksize=(1, 224, 336), meta=np.ndarray>
Attributes: (12/19)
Conventions: CF-1.6
archive_facility: VITO
copyright: Copernicus Service information 2021
history: 2021-03-01 - Processing line NDVI LTS
identifier: urn:cgls:global:ndvi_stats_all:NDVI-LTS_1999-2019-0...
institution: VITO NV
... ...
references: https://land.copernicus.eu/global/products/ndvi
sensor: VEGETATION-1, VEGETATION-2, VEGETATION
source: Derived from EO satellite imagery
time_coverage_end: 2019-12-31T23:59:59Z
time_coverage_start: 1999-01-01T00:00:00Z
title: Normalized Difference Vegetation Index: Long Term S...LTS = LTS.rio.clip(AOI_poly, crs=4326)
LTS
<xarray.Dataset>
Dimensions: (lat: 203, lon: 327, time: 36)
Coordinates:
* lat (lat) float64 46.49 46.48 46.47 46.46 ... 44.71 44.71 44.7 44.69
* lon (lon) float64 8.509 8.518 8.527 8.536 ... 11.39 11.4 11.41 11.42
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-12-21
crs int64 0
Data variables:
max (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
mean (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
median (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
min (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
nobs (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
stdev (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
Attributes: (12/19)
Conventions: CF-1.6
archive_facility: VITO
copyright: Copernicus Service information 2021
history: 2021-03-01 - Processing line NDVI LTS
identifier: urn:cgls:global:ndvi_stats_all:NDVI-LTS_1999-2019-0...
institution: VITO NV
... ...
references: https://land.copernicus.eu/global/products/ndvi
sensor: VEGETATION-1, VEGETATION-2, VEGETATION
source: Derived from EO satellite imagery
time_coverage_end: 2019-12-31T23:59:59Z
time_coverage_start: 1999-01-01T00:00:00Z
title: Normalized Difference Vegetation Index: Long Term S...%%time
LTS.compute()
CPU times: user 2.27 s, sys: 211 ms, total: 2.48 s
Wall time: 26 s
<xarray.Dataset>
Dimensions: (lat: 203, lon: 327, time: 36)
Coordinates:
* lat (lat) float64 46.49 46.48 46.47 46.46 ... 44.71 44.71 44.7 44.69
* lon (lon) float64 8.509 8.518 8.527 8.536 ... 11.39 11.4 11.41 11.42
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-12-21
crs int64 0
Data variables:
max (time, lat, lon) float32 nan nan nan nan nan ... nan nan nan nan
mean (time, lat, lon) float32 nan nan nan nan nan ... nan nan nan nan
median (time, lat, lon) float32 nan nan nan nan nan ... nan nan nan nan
min (time, lat, lon) float32 nan nan nan nan nan ... nan nan nan nan
nobs (time, lat, lon) float32 nan nan nan nan nan ... nan nan nan nan
stdev (time, lat, lon) float32 nan nan nan nan nan ... nan nan nan nan
Attributes: (12/19)
Conventions: CF-1.6
archive_facility: VITO
copyright: Copernicus Service information 2021
history: 2021-03-01 - Processing line NDVI LTS
identifier: urn:cgls:global:ndvi_stats_all:NDVI-LTS_1999-2019-0...
institution: VITO NV
... ...
references: https://land.copernicus.eu/global/products/ndvi
sensor: VEGETATION-1, VEGETATION-2, VEGETATION
source: Derived from EO satellite imagery
time_coverage_end: 2019-12-31T23:59:59Z
time_coverage_start: 1999-01-01T00:00:00Z
title: Normalized Difference Vegetation Index: Long Term S...LTS
<xarray.Dataset>
Dimensions: (lat: 203, lon: 327, time: 36)
Coordinates:
* lat (lat) float64 46.49 46.48 46.47 46.46 ... 44.71 44.71 44.7 44.69
* lon (lon) float64 8.509 8.518 8.527 8.536 ... 11.39 11.4 11.41 11.42
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-12-21
crs int64 0
Data variables:
max (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
mean (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
median (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
min (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
nobs (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
stdev (time, lat, lon) float32 dask.array<chunksize=(1, 203, 327), meta=np.ndarray>
Attributes: (12/19)
Conventions: CF-1.6
archive_facility: VITO
copyright: Copernicus Service information 2021
history: 2021-03-01 - Processing line NDVI LTS
identifier: urn:cgls:global:ndvi_stats_all:NDVI-LTS_1999-2019-0...
institution: VITO NV
... ...
references: https://land.copernicus.eu/global/products/ndvi
sensor: VEGETATION-1, VEGETATION-2, VEGETATION
source: Derived from EO satellite imagery
time_coverage_end: 2019-12-31T23:59:59Z
time_coverage_start: 1999-01-01T00:00:00Z
title: Normalized Difference Vegetation Index: Long Term S...%%time
LTS_min = LTS['min']
(LTS_min,)=dask.optimize(LTS_min)
LTS_min.data.visualize()
CPU times: user 1.28 s, sys: 88.3 ms, total: 1.37 s
Wall time: 1.35 s
%%time
LTS_min.compute()
CPU times: user 329 ms, sys: 41.4 ms, total: 370 ms
Wall time: 4.77 s
<xarray.DataArray 'min' (time: 36, lat: 203, lon: 327)>
array([[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
...
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]]], dtype=float32)
Coordinates:
* lat (lat) float64 46.49 46.48 46.47 46.46 ... 44.71 44.71 44.7 44.69
* lon (lon) float64 8.509 8.518 8.527 8.536 ... 11.39 11.4 11.41 11.42
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-12-21
crs int64 0
Attributes:
cell_methods: area: mean time: minimum
flag_meanings: sea no_data
flag_values: [254, 255]
grid_mapping: crs
long_name: Minimum Normalized Difference Vegetation Index over time ...
standard_name: normalized_difference_vegetation_index
units:
valid_range: [0, 250]%%time
LTS_max = LTS['max']
(LTS_max,)=dask.optimize(LTS_max)
LTS_max.data.visualize()
CPU times: user 1.34 s, sys: 131 ms, total: 1.47 s
Wall time: 1.43 s
%%time
LTS_max.compute()
CPU times: user 300 ms, sys: 37.5 ms, total: 337 ms
Wall time: 4.54 s
<xarray.DataArray 'max' (time: 36, lat: 203, lon: 327)>
array([[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
...
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]]], dtype=float32)
Coordinates:
* lat (lat) float64 46.49 46.48 46.47 46.46 ... 44.71 44.71 44.7 44.69
* lon (lon) float64 8.509 8.518 8.527 8.536 ... 11.39 11.4 11.41 11.42
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-12-21
crs int64 0
Attributes:
cell_methods: area: mean time: maximum
flag_meanings: sea no_data
flag_values: [254, 255]
grid_mapping: crs
long_name: Maximum Normalized Difference Vegetation Index over time ...
standard_name: normalized_difference_vegetation_index
units:
valid_range: [0, 250]Get NDVI for 2022 over Lombardia#
We re-use the file we created during the first episode. If the file is missing it will be downloaded from Zenodo.
import pooch
try:
cgls_ds = xr.open_dataset('C_GLS_NDVI_20220101_20220701_Lombardia_S3_2_masked.nc')
except:
cgls_file = pooch.retrieve(
url="https://zenodo.org/record/6969999/files/C_GLS_NDVI_20220101_20220701_Lombardia_S3_2_masked.nc",
known_hash="md5:be3f16913ebbdb4e7af227f971007b22",
path=f".",)
cgls_ds = xr.open_dataset(cgls_file)
Downloading data from 'https://zenodo.org/record/6969999/files/C_GLS_NDVI_20220101_20220701_Lombardia_S3_2_masked.nc' to file '/home/runner/work/foss4g-2022/foss4g-2022/tutorial/pangeo101/3bcb6b0879f7f1f3eebff65b142241cf-C_GLS_NDVI_20220101_20220701_Lombardia_S3_2_masked.nc'.
NDVI_AOI = cgls_ds.NDVI.rio.write_crs(4326, inplace=True)
NDVI_AOI = NDVI_AOI.rio.clip(AOI_poly, crs=4326)
NDVI_AOI
<xarray.DataArray 'NDVI' (time: 20, lat: 612, lon: 984)>
array([[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
...
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]],
[[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
...,
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan],
[nan, nan, nan, ..., nan, nan, nan]]])
Coordinates:
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-07-11
* lon (lon) float64 8.502 8.505 8.508 8.511 ... 11.42 11.42 11.43
* lat (lat) float64 46.5 46.5 46.49 46.49 ... 44.69 44.69 44.68 44.68
spatial_ref int64 0The nominal spatial resolution of the Long term statistics is 1km. As the current NDVI product has a nominal spatial resolution of 300m a re projection is needed. RioXarray through RasterIO that wraps the GDAL method can take care of this. More info about all the options can be found here.
NDVI_1k = NDVI_AOI.rio.reproject_match(LTS)
NDVI_1k = NDVI_1k.rename({'x': 'lon', 'y':'lat'})
VCI = ((NDVI_1k - LTS['min']) / (LTS['max'] - LTS['min'])) * 100
VCI
<xarray.DataArray (time: 20, lat: 203, lon: 327)>
dask.array<mul, shape=(20, 203, 327), dtype=float64, chunksize=(1, 203, 327), chunktype=numpy.ndarray>
Coordinates:
* lon (lon) float64 8.509 8.518 8.527 8.536 ... 11.4 11.41 11.42
* lat (lat) float64 46.49 46.48 46.47 46.46 ... 44.71 44.7 44.69
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-07-11
spatial_ref int64 0
crs int64 0VCI.name = 'VCI'
VCI
<xarray.DataArray 'VCI' (time: 20, lat: 203, lon: 327)>
dask.array<mul, shape=(20, 203, 327), dtype=float64, chunksize=(1, 203, 327), chunktype=numpy.ndarray>
Coordinates:
* lon (lon) float64 8.509 8.518 8.527 8.536 ... 11.4 11.41 11.42
* lat (lat) float64 46.49 46.48 46.47 46.46 ... 44.71 44.7 44.69
* time (time) datetime64[ns] 2022-01-01 2022-01-11 ... 2022-07-11
spatial_ref int64 0
crs int64 0%%time
VCI_c = VCI.compute()
/usr/share/miniconda/envs/foss4g/lib/python3.10/site-packages/dask/core.py:119: RuntimeWarning: divide by zero encountered in divide
return func(*(_execute_task(a, cache) for a in args))
/usr/share/miniconda/envs/foss4g/lib/python3.10/site-packages/dask/core.py:119: RuntimeWarning: divide by zero encountered in divide
return func(*(_execute_task(a, cache) for a in args))
CPU times: user 463 ms, sys: 79.8 ms, total: 543 ms
Wall time: 6.16 s
from hvplot import xarray
VCI.hvplot(x = 'lat', y = 'lon',
cmap='RdYlGn', clim=(-200,+200), alpha=0.7,
geo=True, tiles= 'CartoLight',
title=f'CGLS VCI {AOI_name} {VCI.isel(time=-1).time.dt.date.data}',
width=800, height=700,
)
Now you have catalogue, original data source, both on cloud space, thus even from dask workers which do not have access to your NFS local disk space, data are accessible. Now you are ready to parallelize your analysis using dask workers from dask gateway!
client.close()
cluster.shutdown()
Packages citation#
- EGK+03
John Ellson, Emden R. Gansner, Eleftherios Koutsofios, Stephen C. North, and Gordon Woodhull. Graphviz and dynagraph – static and dynamic graph drawing tools. In GRAPH DRAWING SOFTWARE, 127–148. Springer-Verlag, 2003.
- HMvdW+20
Charles R. Harris, K. Jarrod Millman, Stéfan J. van der Walt, Ralf Gommers, Pauli Virtanen, David Cournapeau, Eric Wieser, Julian Taylor, Sebastian Berg, Nathaniel J. Smith, Robert Kern, Matti Picus, Stephan Hoyer, Marten H. van Kerkwijk, Matthew Brett, Allan Haldane, Jaime Fernández del Río, Mark Wiebe, Pearu Peterson, Pierre Gérard-Marchant, Kevin Sheppard, Tyler Reddy, Warren Weckesser, Hameer Abbasi, Christoph Gohlke, and Travis E. Oliphant. Array programming with NumPy. Nature, 585(7825):357–362, September 2020. URL: https://doi.org/10.1038/s41586-020-2649-2, doi:10.1038/s41586-020-2649-2.
- HH17
S. Hoyer and J. Hamman. Xarray: N-D labeled arrays and datasets in Python. Journal of Open Research Software, 2017. URL: https://doi.org/10.5334/jors.148, doi:10.5334/jors.148.
- JdBF+20
Kelsey Jordahl, Joris Van den Bossche, Martin Fleischmann, Jacob Wasserman, James McBride, Jeffrey Gerard, Jeff Tratner, Matthew Perry, Adrian Garcia Badaracco, Carson Farmer, Geir Arne Hjelle, Alan D. Snow, Micah Cochran, Sean Gillies, Lucas Culbertson, Matt Bartos, Nick Eubank, maxalbert, Aleksey Bilogur, Sergio Rey, Christopher Ren, Dani Arribas-Bel, Leah Wasser, Levi John Wolf, Martin Journois, Joshua Wilson, Adam Greenhall, Chris Holdgraf, Filipe, and François Leblanc. Geopandas/geopandas: v0.8.1. July 2020. URL: https://doi.org/10.5281/zenodo.3946761, doi:10.5281/zenodo.3946761.
- pdt20
The pandas development team. Pandas-dev/pandas: pandas. February 2020. URL: https://doi.org/10.5281/zenodo.3509134, doi:10.5281/zenodo.3509134.
- RSB+20
Philipp Rudiger, Jean-Luc Stevens, James A. Bednar, Bas Nijholt, Andrew, Chris B, Achim Randelhoff, Jon Mease, Vasco Tenner, maxalbert, Markus Kaiser, ea42gh, Jordan Samuels, stonebig, Florian LB, Andrew Tolmie, Daniel Stephan, Scott Lowe, John Bampton, henriqueribeiro, Irv Lustig, Julia Signell, Justin Bois, Leopold Talirz, Lukas Barth, Maxime Liquet, Ram Rachum, Yuval Langer, arabidopsis, and kbowen. Holoviz/holoviews: version 1.13.3. June 2020. URL: https://doi.org/10.5281/zenodo.3904606, doi:10.5281/zenodo.3904606.
- USR+20
Leonardo Uieda, Santiago Rubén Soler, Rémi Rampin, Hugo van Kemenade, Matthew Turk, Daniel Shapero, Anderson Banihirwe, and John Leeman. Pooch: a friend to fetch your data files. Journal of Open Source Software, 5(45):1943, 2020. URL: https://doi.org/10.21105/joss.01943, doi:10.21105/joss.01943.
- DaskDTeam16
Dask Development Team. Dask: Library for dynamic task scheduling. 2016. URL: https://dask.org.
- S3FsDTeam16
S3Fs Development Team. S3Fs. 2016. URL: https://github.com/fsspec/s3fs/.